{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "f160df58-50da-4277-909f-4d4374ff6607",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql import types\n",
    "\n",
    "from pyspark.sql import functions as F"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "f67fcacd-c4d7-4602-80d4-7e6f707ea3e3",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'3.3.2'"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark = SparkSession.builder \\\n",
    "    .master(\"local[*]\") \\\n",
    "    .appName('homework') \\\n",
    "    .getOrCreate()\n",
    "\n",
    "spark.version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "503dac17-04c8-4f01-9bde-a716411a7f71",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.parquet('yellow_tripdata_2024-10.parquet')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "30e46144-dadc-41e3-8f44-73ca511de5e9",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "c8dc58db-b3c1-4745-9192-0325b72f94f4",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 2, 30, 44), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 2, 48, 26), passenger_count=1, trip_distance=3.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=162, DOLocationID=246, payment_type=1, fare_amount=18.4, extra=1.0, mta_tax=0.5, tip_amount=1.5, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=24.9, congestion_surcharge=2.5, Airport_fee=0.0),\n",
       " Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 2, 12, 20), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 2, 25, 25), passenger_count=1, trip_distance=2.2, RatecodeID=1, store_and_fwd_flag='N', PULocationID=48, DOLocationID=236, payment_type=1, fare_amount=14.2, extra=3.5, mta_tax=0.5, tip_amount=3.8, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=23.0, congestion_surcharge=2.5, Airport_fee=0.0),\n",
       " Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 2, 4, 46), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 2, 13, 52), passenger_count=1, trip_distance=2.7, RatecodeID=1, store_and_fwd_flag='N', PULocationID=142, DOLocationID=24, payment_type=1, fare_amount=13.5, extra=3.5, mta_tax=0.5, tip_amount=3.7, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=22.2, congestion_surcharge=2.5, Airport_fee=0.0),\n",
       " Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 2, 12, 10), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 2, 23, 1), passenger_count=1, trip_distance=3.1, RatecodeID=1, store_and_fwd_flag='N', PULocationID=233, DOLocationID=75, payment_type=1, fare_amount=14.2, extra=3.5, mta_tax=0.5, tip_amount=2.0, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=21.2, congestion_surcharge=2.5, Airport_fee=0.0),\n",
       " Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 2, 30, 22), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 2, 30, 39), passenger_count=1, trip_distance=0.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=262, DOLocationID=262, payment_type=3, fare_amount=3.0, extra=3.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=8.0, congestion_surcharge=2.5, Airport_fee=0.0)]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "7bc5e0b1-db5a-4e5e-83d9-49e5c7b3570e",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = df.repartition(4)\n",
    "df.write.parquet('data-out')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "e4418bd5-652b-4e22-a9bb-e3759fc89b3a",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.createOrReplaceTempView('hw5')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "694ac5db-91bc-43c6-958f-556058a78120",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total 97M\n",
      "-rw-r--r-- 1 alexey 197121   0 Feb 22 15:17 _SUCCESS\n",
      "-rw-r--r-- 1 alexey 197121 25M Feb 22 15:17 part-00000-0bd1f6a4-de39-45d9-aa4d-657ac8650605-c000.snappy.parquet\n",
      "-rw-r--r-- 1 alexey 197121 25M Feb 22 15:17 part-00001-0bd1f6a4-de39-45d9-aa4d-657ac8650605-c000.snappy.parquet\n",
      "-rw-r--r-- 1 alexey 197121 25M Feb 22 15:17 part-00002-0bd1f6a4-de39-45d9-aa4d-657ac8650605-c000.snappy.parquet\n",
      "-rw-r--r-- 1 alexey 197121 25M Feb 22 15:17 part-00003-0bd1f6a4-de39-45d9-aa4d-657ac8650605-c000.snappy.parquet\n"
     ]
    }
   ],
   "source": [
    "!ls -lh data-out"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "79d291c4-2afb-4dc5-b56b-5783074880da",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "125567"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime)) \\\n",
    "    .filter(\"pickup_date = '2024-10-15'\") \\\n",
    "    .count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "ca28ae54-425d-4aa2-aaec-001feb5793fe",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|  125567|\n",
      "+--------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark.sql(\"\"\" \n",
    "SELECT count(*)\n",
    "FROM hw5\n",
    "WHERE cast(tpep_pickup_datetime as date) = '2024-10-15'\n",
    "and tpep_pickup_datetime is not null\n",
    "\"\"\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "e33c9139-3ad3-45f9-a751-bef38f3cfae0",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|          duration|\n",
      "+------------------+\n",
      "| 162.6177777777778|\n",
      "|           143.325|\n",
      "|137.76055555555556|\n",
      "|114.83472222222221|\n",
      "| 89.89833333333333|\n",
      "+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.withColumn('duration', (df.tpep_dropoff_datetime.cast('long') - df.tpep_pickup_datetime.cast('long'))/60/60) \\\n",
    "    .orderBy('duration', ascending=False) \\\n",
    "    .limit(5) \\\n",
    "    .select(\"duration\") \\\n",
    "    .show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "id": "9692a04e-52d2-4330-839a-bcc938aeaba1",
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "taxi_zone_schema = types.StructType([\n",
    "    types.StructField('LocationID', types.IntegerType(), True),\n",
    "    types.StructField('Borough', types.StringType(), True),\n",
    "    types.StructField('Zone', types.StringType(), True),\n",
    "    types.StructField('service_zone', types.StringType(), True)\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "id": "778ab687-c5ed-4d25-acf5-60d27978b269",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_zones = spark.read \\\n",
    "    .option(\"header\", \"true\") \\\n",
    "    .schema(taxi_zone_schema) \\\n",
    "    .csv('taxi_zone_lookup.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "id": "2e97dfa2-2f31-4fad-b787-e621a0154592",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+-------------+--------------------+------------+\n",
      "|LocationID|      Borough|                Zone|service_zone|\n",
      "+----------+-------------+--------------------+------------+\n",
      "|         1|          EWR|      Newark Airport|         EWR|\n",
      "|         2|       Queens|         Jamaica Bay|   Boro Zone|\n",
      "|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|\n",
      "|         4|    Manhattan|       Alphabet City| Yellow Zone|\n",
      "|         5|Staten Island|       Arden Heights|   Boro Zone|\n",
      "|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|\n",
      "|         7|       Queens|             Astoria|   Boro Zone|\n",
      "|         8|       Queens|        Astoria Park|   Boro Zone|\n",
      "|         9|       Queens|          Auburndale|   Boro Zone|\n",
      "|        10|       Queens|        Baisley Park|   Boro Zone|\n",
      "|        11|     Brooklyn|          Bath Beach|   Boro Zone|\n",
      "|        12|    Manhattan|        Battery Park| Yellow Zone|\n",
      "|        13|    Manhattan|   Battery Park City| Yellow Zone|\n",
      "|        14|     Brooklyn|           Bay Ridge|   Boro Zone|\n",
      "|        15|       Queens|Bay Terrace/Fort ...|   Boro Zone|\n",
      "|        16|       Queens|             Bayside|   Boro Zone|\n",
      "|        17|     Brooklyn|             Bedford|   Boro Zone|\n",
      "|        18|        Bronx|        Bedford Park|   Boro Zone|\n",
      "|        19|       Queens|           Bellerose|   Boro Zone|\n",
      "|        20|        Bronx|             Belmont|   Boro Zone|\n",
      "+----------+-------------+--------------------+------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_zones.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "id": "c6587d24-d8ae-4d09-9ec1-f6aae6e9ddb7",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_zones.createOrReplaceTempView(\"zones\")\n",
    "df.createOrReplaceTempView(\"yellow\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "id": "68f6a7c9-e655-47a4-a8c1-37428b1f1dd1",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(Zone=\"Governor's Island/Ellis Island/Liberty Island\", cnt=1),\n",
       " Row(Zone='Arden Heights', cnt=2),\n",
       " Row(Zone='Rikers Island', cnt=2),\n",
       " Row(Zone='Jamaica Bay', cnt=3),\n",
       " Row(Zone='Green-Wood Cemetery', cnt=3)]"
      ]
     },
     "execution_count": 41,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.sql(\"\"\"\n",
    "SELECT\n",
    "    zones.Zone,\n",
    "    COUNT(1) as cnt\n",
    "FROM\n",
    "    yellow y LEFT JOIN zones ON \n",
    "        y.PULocationID = zones.LocationID\n",
    "GROUP BY 1\n",
    "ORDER BY 2 ASC;\n",
    "\"\"\").take(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "20423236-ae33-485b-ac6e-40fbb9427273",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "128893"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import pandas as pd\n",
    "df = pd.read_parquet('yellow_tripdata_2024-10.parquet')\n",
    "df['dt'] = df['tpep_pickup_datetime'].dt.date\n",
    "(df['dt']==pd.to_datetime('2024-10-15').date()).sum()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
